package com.atguigu.gmall.realtime.app;

import com.atguigu.gmall.realtime.util.FlinkSourceUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/8/24 10:07
 */
public abstract class BaseAppV1 {
    protected abstract void run(StreamExecutionEnvironment env,
                                DataStreamSource<String> sourceStream);
    
    protected void init( int port,
                         int p,
                         String ck,
                         String groupId,
                         String topic){
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", port);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(p);
    
        env.enableCheckpointing(3000);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/gmall/ck/" + ck);
        env
            .getCheckpointConfig()
            .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    
        DataStreamSource<String> sourceStream = env.addSource(FlinkSourceUtil.getKafkaSource(groupId, topic));
    
        // 业务处理不一样
        run(env, sourceStream);
        
        try {
            env.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
